package com.atuguigu.flink.Day05;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;

//实时对账，双流join
//当pay时，会拉起三方支付，当两个流同时满足时，则对账成功.延迟时间为5s
public class Example3_3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<Event> payStream = env
                .fromElements(
                        new Event("order1", "pay", 1000L),
                        new Event("order2", "pay", 2000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );

        SingleOutputStreamOperator<Event> weixinStream = env.fromElements(
                new Event("order1", "weixin", 3000L),
                new Event("order3", "weixin", 4000L)
        )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );

        payStream
                .keyBy(r->r.order_id)
                .connect(weixinStream.keyBy(r->r.order_id))
                .process(new MatchFunction())
                .print();


        env.execute();
    }

    public static class MatchFunction extends CoProcessFunction<Event,Event,String>{


        //第一条流为 pay的流
        @Override
        public void processElement1(Event value, Context ctx, Collector<String> out) throws Exception {

        }

        //第二条流为weixin的流
        @Override
        public void processElement2(Event value, Context ctx, Collector<String> out) throws Exception {

        }
    }

    //POJO类
    public static class Event{
        public String order_id;
        public String eventType;
        public Long timestamp;

        public Event() {
        }

        public Event(String order_id, String eventType, Long timestamp) {
            this.order_id = order_id;
            this.eventType = eventType;
            this.timestamp = timestamp;
        }

        @Override
        public String toString() {
            return "Event{" +
                    "order_id='" + order_id + '\'' +
                    ", eventType='" + eventType + '\'' +
                    ", timestamp=" + new Timestamp(timestamp) +
                    '}';
        }
    }

}
